AWS Step FunctionsからAmazon Kinesis Data FirehoseにJSONデータを”改行コード付きで”Putする(AWS CDK)

AWS Step FunctionsからAmazon Kinesis Data FirehoseにJSONデータを”改行コード付きで”Putする(AWS CDK)

Clock Icon2022.08.09

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

こんにちは、CX事業本部 IoT事業部の若槻です。

前回の下記エントリでStep FunctionsからDelivery streamにJsonデータをPutする実装をしてみました。

その際にDelivery streamではProcessorの設定によりレコード間の区切り記号として改行コードを自動挿入されるようにしました。これによりAthenaでクエリ可能なJSON Lines形式のオブジェクトとしてBucketに配信されるため、Putを実施するStep Functions側では区切り記号を意識する必要はありませんでした。

今回は、上記とは別のアプローチとして、Processorを設定せずに、Step Functionsから改行コード付きでPutする実装を AWS CDKで行ってみました。

やってみた

実装

AWS CDK v2(TypeScript)で次のようなCDKスタックを作成します。

import { Construct } from 'constructs';
import {
  aws_s3,
  aws_stepfunctions_tasks,
  aws_stepfunctions,
  Stack,
  StackProps,
  RemovalPolicy,
  Duration,
  Size,
} from 'aws-cdk-lib';
import * as firehose_alpha from '@aws-cdk/aws-kinesisfirehose-alpha';
import * as firehose_destinations_alpha from '@aws-cdk/aws-kinesisfirehose-destinations-alpha';

export class ProcessStack extends Stack {
  constructor(scope: Construct, id: string, props: StackProps) {
    super(scope, id, props);

    // データ格納バケット
    const dataBucket = new aws_s3.Bucket(this, 'dataBucket', {
      bucketName: `data-${this.account}-${this.region}`,
      removalPolicy: RemovalPolicy.DESTROY,
    });

    // Kinesis Firehose Delivery Stream
    const deliveryStream = new firehose_alpha.DeliveryStream(
      this,
      'deliveryStream',
      {
        deliveryStreamName: 'deliveryStream',
        destinations: [
          new firehose_destinations_alpha.S3Bucket(dataBucket, {
            dataOutputPrefix: 'data/',
            errorOutputPrefix: 'error/!{firehose:error-output-type}/',
            bufferingInterval: Duration.seconds(60),
            bufferingSize: Size.mebibytes(64),
          }),
        ],
      }
    );

    // 改行コード文字列の取得
    const getNewLineCodeTask = new aws_stepfunctions.Pass(
      this,
      'getNewLineCodeTask',
      {
        parameters: {
          value: '\n',
        },
        resultPath: '$.getNewLineCodeTaskOutPut',
      }
    );

    // Delivery streamへレコードをPutするタスク
    const putDataToDeliveryStreamTask =
      new aws_stepfunctions_tasks.CallAwsService(
        this,
        'putDataToDeliveryStreamTask',
        {
          service: 'firehose',
          action: 'putRecord',
          parameters: {
            DeliveryStreamName: deliveryStream.deliveryStreamName,
            Record: {
              Data: aws_stepfunctions.JsonPath.format(
                `{}{}`,
                aws_stepfunctions.JsonPath.jsonToString(
                  aws_stepfunctions.JsonPath.objectAt('$.input.record')
                ),
                aws_stepfunctions.JsonPath.stringAt(
                  '$.getNewLineCodeTaskOutPut.value'
                )
              ),
            },
          },
          iamResources: [deliveryStream.deliveryStreamArn],
          iamAction: 'firehose:PutRecord',
          resultPath: aws_stepfunctions.JsonPath.DISCARD,
        }
      );

    // State Machine
    new aws_stepfunctions.StateMachine(this, 'stateMachine', {
      stateMachineName: 'stateMachine',
      definition: getNewLineCodeTask.next(putDataToDeliveryStreamTask),
    });
  }
}

上記をCDK Deployしてスタックをデプロイすると、次の定義のState Machineが作成されます。

{
  "StartAt": "getNewLineTask",
  "States": {
    "getNewLineTask": {
      "Type": "Pass",
      "ResultPath": "$.getNewLineTaskOutPut",
      "Parameters": {
        "newLine": "\n"
      },
      "Next": "putDataToDeliveryStreamTask"
    },
    "putDataToDeliveryStreamTask": {
      "End": true,
      "Type": "Task",
      "ResultPath": null,
      "Resource": "arn:aws:states:::aws-sdk:firehose:putRecord",
      "Parameters": {
        "DeliveryStreamName": "deliveryStream",
        "Record": {
          "Data.$": "States.Format('{}{}', States.JsonToString($.input.record), $.getNewLineTaskOutPut.newLine)"
        }
      }
    }
  }
}

動作確認

次のInputを指定してState Machineを2回連続で実行します。

{
    "input": {
      "record": {
        "id": "u001",
        "count": 1,
        "area": "Akihabara"
      }
    }
}

いずれのState Machine実行も成功しました。

TaskのEventを見ると、JSONデータの文字列末尾に改行コードが付与されてPutされていることが分かります。

{
  "resourceType": "aws-sdk:firehose",
  "resource": "putRecord",
  "region": "ap-northeast-1",
  "parameters": {
    "DeliveryStreamName": "deliveryStream",
    "Record": {
      "Data": "{\"id\":\"u001\",\"count\":1,\"area\":\"Akihabara\"}\n"
    }
  },
  "timeoutInSeconds": null,
  "heartbeatInSeconds": null
}

Buffer期間が経過後に出力先Bucketの内容を確認すると、オブジェクトが1つ出力されています。

$ aws s3 ls s3://${BUCKET_NAME} --recursive   
2022-08-09 23:15:21         86 data/2022/08/09/14/deliveryStream-1-2022-08-09-14-14-19-04042654-2a79-4bba-bcf7-f926ff1eac7c

オブジェクトの内容は以下の通りです。JSONデータが改行コード区切りで記載されていますね!

{"id":"u001","count":1,"area":"Akihabara"}
{"id":"u001","count":1,"area":"Akihabara"}

出来そうで出来なかった実装

今回の実装では改行コード文字列(\n)の取得をするタスク(getNewLineCodeTask)を作成し、その戻り値をDelivery streamへレコードをPutするタスク(putDataToDeliveryStreamTask)で使用するようにしました。

当初改行コードをputDataToDeliveryStreamTask内で直接記述して使用しようとしましたが、下記の実装のどのパターンでもCDK Deploy時にSCHEMA_VALIDATION_FAILEDとなりました。

            Record: {
              Data: aws_stepfunctions.JsonPath.format(
                `{}\\n`,
                aws_stepfunctions.JsonPath.jsonToString(
                  aws_stepfunctions.JsonPath.objectAt('$.input.record')
                )
              ),
            },
            Record: {
              Data: aws_stepfunctions.JsonPath.format(
                `{}{}`,
                aws_stepfunctions.JsonPath.jsonToString(
                  aws_stepfunctions.JsonPath.objectAt('$.input.record')
                ),
                '\n'
              ),
            },
            Record: {
              Data: aws_stepfunctions.JsonPath.format(
                `{}\n`,
                aws_stepfunctions.JsonPath.jsonToString(
                  aws_stepfunctions.JsonPath.objectAt('$.input.record')
                )
              ),
            },
            Record: {
              Data: aws_stepfunctions.JsonPath.format(
                `{}\\\n`,
                aws_stepfunctions.JsonPath.jsonToString(
                  aws_stepfunctions.JsonPath.objectAt('$.input.record')
                )
              ),
            },
cdk deploy ProcessStack

✨  Synthesis time: 2.89s

ProcessStack: deploying...
[0%] start: Publishing 25d80fb5896d88cb53115944560d6c42f04f538a1da18cc2c63deae666dae0e4:current_account-ap-northeast-1
[100%] success: Published 25d80fb5896d88cb53115944560d6c42f04f538a1da18cc2c63deae666dae0e4:current_account-ap-northeast-1
ProcessStack: creating CloudFormation changeset...
2:45:21 AM | UPDATE_FAILED        | AWS::StepFunctions::StateMachine     | stateMachineE926C166
Resource handler returned message: "Invalid State Machine Definition: 'SCHEMA_VALIDATION_FAILED: The value for the field 'Data.$' must be a valid JSONPath or a valid intrinsic function call at /States/putDataToDeliveryStreamTask/Parameters' (Service: AWSStepFunctions; Status Code:400; Error Code: InvalidDefinition; Request ID: 1c701d23-32d8-47a9-85dd-2970e3ae74c8; Proxy: null)" (RequestToken: ba39682b-a163-44dc-a44f-763c1e49b412, HandlerErrorCode: InvalidRequest)

また次のように実装した場合。

            Record: {
              Data: aws_stepfunctions.JsonPath.format(
                `{}\\\\n`,
                aws_stepfunctions.JsonPath.jsonToString(
                  aws_stepfunctions.JsonPath.objectAt('$.input.record')
                )
              ),
            },

出力されるデータの内容は次のようにJSON Lines形式となりませんでした。

{"id":"u001","count":1,"area":"Akihabara"}\\n{"id":"u001","count":1,"area":"Akihabara"}\\n

よって、改行コード文字列(\n)の取得をするタスクを別途作成する必要がありました。

おわりに

AWS Step FunctionsからAmazon Kinesis Data FirehoseにJSONデータを改行コード付きでPutする実装をAWS CDKで作ってみました。

Delivery stream側でProcessorを設定する場合と比べて実装がだいぶシンプルになった気がします。Dynamic Partitioningの機能を利用する場合は前回の紹介した方法、利用しない場合は今回の方法と使い分けできれば良いと思いました。

以上

Share this article

facebook logohatena logotwitter logo

© Classmethod, Inc. All rights reserved.